package com.bdf.congcache.utils.threadpool;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.*;

/**
 * @Author 田培融
 * @Description  线程池管理器
 * @Date 15:39 2019/7/10
 **/
public class ThreadPoolManager {

    private static final Log log =  LogFactory.getLog(ThreadPoolManager.class);

    private static ThreadPoolManager instance = null;

    // 线程池的集合  池程名的名称为key
    private ConcurrentHashMap<String, ThreadPoolExecutor> pools;

    // 线程池的配置
    private static PoolConfiguration defaultConfig;

    // 在系统启动时加载的配置文件
    private static Properties props = null;

    private static boolean useBoundary = true;

    private static int boundarySize = 2000;

    private static int maximumPoolSize = 150;

    private static int minimumPoolSize = 4;

    private static int keepAliveTime = 1000 * 60 * 5;

    private static PoolConfiguration.WhenBlockedPolicy whenBlockedPolicy = PoolConfiguration.WhenBlockedPolicy.RUN;

    private static int startUpSize = 4;

    private static final String PROP_NAME_ROOT = "thread_pool";

    private static final String DEFAULT_PROP_NAME_ROOT = "thread_pool.default";

    private ThreadPoolManager()
    {
        this.pools = new ConcurrentHashMap<String, ThreadPoolExecutor>();
        configure();
    }

    private static void configure()
    {
        if (log.isDebugEnabled())
        {
            log.debug("initialize ThreadPoolManager");
        }

        if (props == null)
        {
            props = new Properties();
        }

        defaultConfig = new PoolConfiguration(useBoundary, boundarySize, maximumPoolSize, minimumPoolSize,
                keepAliveTime, whenBlockedPolicy, startUpSize);

        defaultConfig = loadConfig(DEFAULT_PROP_NAME_ROOT);
    }


    private ThreadPoolExecutor createPool(PoolConfiguration config) {

        //TODO blockingQueue队列的特性
        BlockingQueue<Runnable> queue = null;
        // 给线程池设置边界
        if (config.isUseBoundary()) {
            queue = new LinkedBlockingQueue<Runnable>(config.getBoundarySize());
        } else {
            queue = new LinkedBlockingQueue<Runnable>();
        }

        ThreadPoolExecutor pool = new ThreadPoolExecutor(config.getStartUpSize(), config.getMaximumPoolSize(),
                config.getKeepAliveTime(), TimeUnit.MILLISECONDS, queue,
                new CongCacheThreadFactory("CongCache-ThreadPoolManager-"));


        // 设置线程池的状态
        switch (config.getWhenBlockedPolicy()) {
            case ABORT:
                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
                break;

            case RUN:
                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                break;

            case WAIT:
                throw new RuntimeException("POLICY_WAIT no longer supported");

            case DISCARDOLDEST:
                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
                break;

            default:
                break;
        }
        pool.prestartAllCoreThreads();
        return pool;
    }


    public static synchronized void dispose(){

        // 遍历关闭所有线程池
        if (instance!=null){
            for (String poolName : instance.getPoolNames()){
                try{
                    instance.getPool(poolName).shutdownNow();
                }catch (Throwable t){
                    log.warn("fail  to close pool "+poolName,t);
                }
            }
            instance =null;
        }
    }

    /**
     * @Author 田培融
     * @Description 获取连接池  如果连接池不存在则创建一个
     * @Date 18:26 2019/7/10
     * @Param [name] 连接池的名称
     * @return java.util.concurrent.ThreadPoolExecutor
     **/
    public ThreadPoolExecutor getPool(String name){
        ThreadPoolExecutor pool =pools.get(name);
        if (pool == null){
            if (log.isDebugEnabled()){
                log.debug("create pool for name ["+name+"]");
            }

            PoolConfiguration config = loadConfig(PROP_NAME_ROOT+"."+name);
            pool =createPool(config);
            ThreadPoolExecutor _pool = pools.putIfAbsent(name, pool);

            if (_pool != null)
            {
                pool = _pool;
            }
            if (log.isDebugEnabled())
            {
                log.debug("PoolName = " + getPoolNames());
            }
        }
        return pool;
    }

    /**
     * 获取所有线程池名称
     * @return  List
     */
    public ArrayList<String> getPoolNames(){
        // 所有线程池的名字
        return new ArrayList<String>(pools.keySet());
    }

    // 获取连接池的配置
    public static PoolConfiguration loadConfig(String threadPoolName){
        PoolConfiguration config = defaultConfig.clone();
        // 如果没有配置则使用false
        try {
            config.setUseBoundary(Boolean.parseBoolean(props.getProperty(threadPoolName+".useBoundary","false")));
        }catch (NumberFormatException nfe){
            log.error("useBoundary not a boolean.", nfe);
        }

        try
        {
            config.setBoundarySize(Integer.parseInt(props.getProperty(threadPoolName + ".boundarySize", "2000")));
        }
        catch (NumberFormatException nfe)
        {
            log.error("boundarySize not a number.", nfe);
        }

        try
        {
            config.setMaximumPoolSize(Integer.parseInt(props.getProperty(threadPoolName + ".maximumPoolSize", "150")));
        }
        catch (NumberFormatException nfe)
        {
            log.error("maximumPoolSize not a number.", nfe);
        }

        try
        {
            config.setMinimumPoolSize(Integer.parseInt(props.getProperty(threadPoolName + ".minimumPoolSize", "4")));
        }
        catch (NumberFormatException nfe)
        {
            log.error("minimumPoolSize not a number.", nfe);
        }

        try
        {
            config.setKeepAliveTime(Integer.parseInt(props.getProperty(threadPoolName + ".keepAliveTime", "300000")));
        }
        catch (NumberFormatException nfe)
        {
            log.error("keepAliveTime not a number.", nfe);
        }

        config.setWhenBlockedPolicy(props.getProperty(threadPoolName + ".whenBlockedPolicy", "RUN"));

        try
        {
            config.setStartUpSize(Integer.parseInt(props.getProperty(threadPoolName + ".startUpSize", "4")));
        }
        catch (NumberFormatException nfe)
        {
            log.error("startUpSize not a number.", nfe);
        }

        if (log.isInfoEnabled())
        {
            log.info(threadPoolName + " PoolConfiguration = " + config);
        }

        return config;
    }

    public static synchronized ThreadPoolManager getInstance()
    {
        if (instance == null)
        {
            instance = new ThreadPoolManager();
        }
        return instance;
    }

    public static void setProps(Properties props)
    {
        ThreadPoolManager.props = props;
    }
}
